feat: pluggable RangeJob sources for repackage (cdx / sql{athena,duckdb} / csv)#73
Draft
malteos wants to merge 6 commits into
Draft
feat: pluggable RangeJob sources for repackage (cdx / sql{athena,duckdb} / csv)#73malteos wants to merge 6 commits into
malteos wants to merge 6 commits into
Conversation
…}/csv)
Rename the warc_by_cdx command to 'repackage' and make the range-job stage
pluggable behind a RangeJobSource abstraction:
- sources/: base (RangeJobSource + CostEstimate), sql_base (shared query
builder + crawl resolution), cdx, athena, duckdb (optional dep), csv
(reader + RangeJobCsvWriter), and a make_source factory.
- CLI: --target-source {cdx,sql,csv} with --engine {athena,duckdb}; shared
--hostnames/--query/--query-file; --duckdb-index-path; --csv-path; CSV
materialization via --range-jobs-output/--no-fetch/--csv-self-contained;
generalized --confirm-cost guard.
- WARCFilter: takes an injected source; orchestrator owns queueing, the
record limit, counting, and _STOP emission in a finally (fixes the prior
hung-readers bug when a source raised). Sources own their own stage-1
client/connection; WARCFilter manages only read/write S3 clients.
- DuckDB reads the CC columnar parquet directly via read_parquet with
per-crawl partition globbing; Athena unchanged in behaviour.
- setup.py: cdx_toolkit[duckdb] extra; conftest: requires_duckdb.
Tests: unit (sql_base, csv round-trip, make_source, confirm_cost, producer
stop-sentinel regression, --no-fetch); CSV round-trip e2e from the CDX
fixture; gated Athena/DuckDB e2e (CC-MAIN-2026-17/commoncrawl.org, single
partition). DuckDB e2e verified live; 205 passed.
Extend the guided SQL filter (athena + duckdb) to match on url_host_registered_domain in addition to url_host_name. --hostnames and --domains can be combined (OR-ed); at least one (or a raw --query) is required. TLD optimizer hint is derived from both. Also make the DuckDB source resilient to transient S3 read timeouts (http_timeout/http_retries). Tests: unit coverage for domain-only / combined host+domain query building (athena + duckdb) and factory validation; gated DuckDB domain e2e (CC-MAIN-2026-17 / commoncrawl.org, --limit 10) verified live.
A raw --query/--query-file can SELECT analysis columns (e.g. content_languages) beyond the required warc_filename/offset/length; those extra columns now flow through to the materialized range-jobs CSV (RangeJob.extra -> lazy CSV header). Rename the CSV fetch-job columns to the index's WARC names (warc_filename, warc_record_offset, warc_record_length; warc_url for self-contained) so they never collide with the index's own columns (e.g. the page-URL 'url' column). The CSV reader auto-detects mode from warc_url/warc_filename and warns+prefers warc_url if both are present.
… locality Group records of the same WARC file with ascending offsets to improve S3 range-read locality (and to enable later coalescing). Applied as an ORDER BY on guided SQL queries (Athena + DuckDB) and as an in-memory sort when loading a CSV source. Raw --query and cdx sources keep their order. New --no-sort-ranges opts out (e.g. already-sorted / very large CSV, or to preserve original order).
Add `cdxt repackage --processes N`: shard range jobs by warc_filename across N worker processes (one asyncio event loop per CPU core), then merge the per-process shards into a single <prefix>.warc.gz with one warcinfo record (server-side S3 UploadPartCopy, or fsspec streaming locally). Reads from s3://commoncrawl in-region. The work is many small independent range-GETs; the limiter is request-rate on one core. Multi-process gives ~2.6x on a 4-vCPU c5n.xlarge (~457 -> ~1130 rec/s). - one writer per process; removed the multi-writer-per-process fan-out (num_writers / fetcher_to_consumer_ratio / --parallel_writers) and the now-vestigial writer_id segment from output filenames - only the first shard writes the warcinfo; all shards share one canonical WARC-Record-ID and the warcinfo filename names the merged file - optional uvloop event loop via CDXT_UVLOOP=1 (~+8% single-core) - new modules: filter_warc/multiprocess.py, filter_warc/merge.py - docs: README repackage section, CHANGELOG, docs/notes/warc-fetcher-performance.md - tests updated for the new filename scheme and single-writer model Claude-Session: https://claude.ai/code/session_011XtVGuu26tiQpAbdPhnixk
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Makes the WARC-repackaging command's range-job stage pluggable behind a
RangeJobSourceabstraction, and renames the command
warc_by_cdx→repackage(it now repackages WARC rangesfrom several sources, not just CDX).
Range jobs (a WARC file + byte range) can now come from:
--target-source--enginecdxsqlathenasqlduckdbread_parquet)csvAthena and DuckDB share one SQL core (
sources/sql_base.py); they differ only in theFROMclauseand execution. The pipeline orchestrator now owns queueing, the record limit, counting, and
stop-sentinel emission (in a
finally) — which also fixes a latent bug where a source raisingmid-run left the WARC readers hung forever. Each source owns its own stage-1 client/connection;
WARCFiltermanages only the read/write S3 clients.CLI usage
Global flags (
--crawl,--limit) come before therepackagesubcommand; source flags come after.CDX files (default)
cdxt repackage \ --target-source cdx \ --cdx-path filtered_CC-MAIN-2024-30.cdx.gz \ --prefix ./out/EXAMPLE \ --warc-download-prefix https://data.commoncrawl.org # multiple indices via glob: cdxt repackage --target-source cdx --cdx-path s3://bucket/cdx/ --cdx-glob '*.cdx.gz' --prefix ./out/EXAMPLESQL — Athena (guided by hostnames/domains, pruned by crawl)
cdxt --crawl CC-MAIN-2026-17 repackage \ --target-source sql --engine athena \ --hostnames commoncrawl.org www.commoncrawl.org \ --athena-database ccindex \ --athena-s3-output s3://my-bucket/athena-results/ \ --prefix s3://my-bucket/out/CC \ --warc-download-prefix s3://commoncrawlThe guided filter matches on
url_host_name(exact host) via--hostnamesand/orurl_host_registered_domainvia--domains(which also covers subdomains). They can be combined(predicates are OR-ed):
# every host under the example.com registered domain, plus one exact extra host cdxt --crawl CC-MAIN-2026-17 repackage \ --target-source sql --engine duckdb \ --domains example.com \ --hostnames blog.example.org \ --prefix ./out/EXSQL — DuckDB (reads the public parquet directly; no Athena charge)
cdxt --crawl CC-MAIN-2026-17 repackage \ --target-source sql --engine duckdb \ --hostnames commoncrawl.org \ --prefix ./out/CC # requires the optional dependency: pip install cdx_toolkit[duckdb]SQL — raw query (power users)
The query must
SELECT warc_filename, warc_record_offset, warc_record_length:cdxt repackage \ --target-source sql --engine athena \ --query "SELECT warc_filename, warc_record_offset, warc_record_length FROM ccindex WHERE subset = 'warc' AND crawl = 'CC-MAIN-2026-17' AND url_host_registered_domain = 'commoncrawl.org' AND content_mime_type = 'application/pdf' LIMIT 5000" \ --athena-database ccindex \ --athena-s3-output s3://my-bucket/athena-results/ \ --prefix ./out/PDFS \ --confirm-cost # or load it from a file with --query-file ./my_query.sqlCSV (consume a previously materialized range-jobs file)
cdxt repackage \ --target-source csv \ --csv-path ranges.csv \ --prefix ./out/CC \ --warc-download-prefix https://data.commoncrawl.orgCost guard
SQL index scans bill by data scanned (Athena: ~$5/TB), so
repackageprompts before a potentiallyexpensive query. It runs without a prompt only when the query is restricted to ≤ 10 crawls.
Otherwise (no
--crawl→ all crawls, > 10 crawls, or a raw--querywhose pruning can't beverified) it asks for confirmation; in a non-interactive shell it aborts unless
--confirm-costispassed.
cdxandcsvsources never prompt.Range-jobs CSV (materialization)
Any source can write its resolved range jobs to a CSV with
--range-jobs-output. Add--no-fetchto only produce the CSV (cheap — no WARC download), then consume it later. This decouples the
(possibly expensive) index query from extraction, makes runs reproducible/shareable, and lets the
whole reader/writer pipeline be tested without AWS.
CSV formats
Default (relative filename) — the consumer prepends
--warc-download-prefix:Self-contained (
--csv-self-contained) — full URLs, used as-is (no prefix needed). The readerauto-detects the mode from the header (
warc_urlvswarc_filename); if both are present it warnsand uses
warc_url:.tsv/.tsv.gzinputs are read as tab-delimited;.gzinputs are decompressed.Extra columns from a raw
--query— any column a raw--query/--query-fileSELECTs beyond therequired
warc_filename, warc_record_offset, warc_record_lengthis carried through to therange-jobs CSV (handy for later analysis), e.g. adding
content_languages:The fetch-job columns are named
warc_*precisely so they never collide with the index's owncolumns (e.g. the page-URL column
urlflows through as an ordinary extra column, distinct fromwarc_url). These extra columns don't affect fetching.Fetch-time ordering
Range jobs are sorted by
(warc_filename, warc_record_offset)by default, grouping records of thesame WARC file with ascending offsets for better S3 range-read locality (and to enable coalescing
adjacent ranges later). This is applied as an
ORDER BYon a guided SQL query (Athena/DuckDB) and asan in-memory sort when loading a CSV source. A raw
--query/--query-fileis left untouched (orderit yourself), and
cdxsources keep their native SURT order. Pass--no-sort-rangesto disable —useful for an already-sorted or very large CSV (the CSV sort buffers all rows in memory), or to
preserve a source's original order.
Optional dependency
DuckDB is optional:
pip install cdx_toolkit[duckdb]. Without it, the other sources work unchanged;selecting
--engine duckdbraises a clear error.Testing
--domains/ combined host+domain),escape_sql_literalinjection rejection, CSVreader/writer round-trip in both modes + header auto-detection, the
make_sourcefactoryvalidation, the
confirm_costguard,--no-fetch, and a regression test that the producer stillreleases readers (
_STOP) when a source raises.CC-MAIN-2026-17/commoncrawl.org(single partition — cheap, never an all-crawls scan), including a DuckDB
--domainsrun.Verified locally: full suite 205 passed; DuckDB e2e ran live and passed; all 6 S3
aioboto3 read/write tests pass; flake8 clean. (Athena gated test skips where Athena permissions
aren't available; its code path is covered by unit tests and shares the orchestration/fetch path
proven by the DuckDB e2e.)